package types

import (
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/types/known/anypb"

	"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
	"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
)

// BroadcastAppendResult is the result of broadcast append operation.
type BroadcastAppendResult struct {
	BroadcastID   uint64                   // the broadcast id of the append operation.
	AppendResults map[string]*AppendResult // make the channel name to the append result.
}

// GetAppendResult returns the append result of the given channel.
func (r *BroadcastAppendResult) GetAppendResult(channelName string) *AppendResult {
	return r.AppendResults[channelName]
}

// AppendResult is the result of append operation.
type AppendResult struct {
	// MessageID is generated by underlying walimpls.
	MessageID message.MessageID

	// LastConfirmedMessageID is the last confirmed message id.
	// From these message id, the reader can read all the messages which timetick is greater than the TimeTick in response.
	LastConfirmedMessageID message.MessageID

	// TimeTick is the time tick of the message.
	// Set by timetick interceptor.
	TimeTick uint64

	// TxnCtx is the transaction context of the message.
	// If the message is not belong to a transaction, the TxnCtx will be nil.
	TxnCtx *message.TxnContext

	// Extra is the extra information of the append result.
	Extra *anypb.Any
}

// GetExtra unmarshal the extra information to the given message.
func (r *AppendResult) GetExtra(m proto.Message) error {
	return anypb.UnmarshalTo(r.Extra, m, proto.UnmarshalOptions{
		DiscardUnknown: true,
		AllowPartial:   true,
	})
}

// IntoProto converts the append result to proto.
func (r *AppendResult) IntoProto() *streamingpb.ProduceMessageResponseResult {
	return &streamingpb.ProduceMessageResponseResult{
		Id:              r.MessageID.IntoProto(),
		Timetick:        r.TimeTick,
		TxnContext:      r.TxnCtx.IntoProto(),
		Extra:           r.Extra,
		LastConfirmedId: r.LastConfirmedMessageID.IntoProto(),
	}
}

// NewAppendResponseN creates a new append response.
func NewAppendResponseN(n int) AppendResponses {
	return AppendResponses{
		Responses: make([]AppendResponse, n),
	}
}

// AppendResponse is the response of one append operation.
type AppendResponse struct {
	AppendResult *AppendResult
	Error        error
}

// AppendResponses is the response of append operation.
type AppendResponses struct {
	Responses []AppendResponse
}

func (a AppendResponses) MaxTimeTick() uint64 {
	var maxTimeTick uint64
	for _, r := range a.Responses {
		if r.AppendResult != nil && r.AppendResult.TimeTick > maxTimeTick {
			maxTimeTick = r.AppendResult.TimeTick
		}
	}
	return maxTimeTick
}

// UnwrapFirstError returns the first error in the responses.
func (a AppendResponses) UnwrapFirstError() error {
	for _, r := range a.Responses {
		if r.Error != nil {
			return r.Error
		}
	}
	return nil
}

// FillAllError fills all the responses with the same error.
func (a *AppendResponses) FillAllError(err error) {
	for i := range a.Responses {
		a.Responses[i].Error = err
	}
}

// FillResponseAtIdx fill the response at idx
func (a *AppendResponses) FillResponseAtIdx(resp AppendResponse, idx int) {
	a.Responses[idx] = resp
}

func (a *AppendResponses) FillAllResponse(resp AppendResponse) {
	for i := range a.Responses {
		a.Responses[i] = resp
	}
}
